#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "lich_md.h"
#include "net_table.h"
#include "configure.h"
#include "../replica/replica.h"
#include "table_proto.h"
#include "pool_proto.h"
#include "pool_rename.h"
#include "md_proto.h"
#include "net_global.h"
#include "job_dock.h"
#include "ylog.h"
#include "dbg.h"

typedef pool_proto_entry_t entry_t;

static int __pool_rename_set_log(pool_proto_t *pool_proto, const chkid_t *from,
                            const char *_fromname, const chkid_t *to, const char *_toname)
{
        int ret;
        char buf[MAX_BUF_LEN];
        char fromname[MAX_NAME_LEN], toname[MAX_NAME_LEN];

        base64_encode(_fromname, strlen(_fromname) + 1, fromname);
        base64_encode(_toname, strlen(_toname) + 1, toname);

        snprintf(buf, MAX_PATH_LEN, "from:("CHKID_FORMAT")/(%s), to:("CHKID_FORMAT")/(%s)",
                 CHKID_ARG(from), fromname, CHKID_ARG(to), toname);

        ret = pool_proto->xattr_set(pool_proto, LICH_SYSTEM_ATTR_RENAME,
                                  buf, strlen(buf) + 1, O_EXCL);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DINFO("pool_rename @ "CHKID_FORMAT" need retry again\n",
                              CHKID_ARG(&pool_proto->chkid));
                        ret = EAGAIN;
                }

                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_get_log(pool_proto_t *pool_proto, chkid_t *_from,
                            char *_fromname, chkid_t *_to, char *_toname)
{
        int ret, buflen = MAX_NAME_LEN, len;
        chkid_t from, to;
        char buf[MAX_BUF_LEN], str1[MAX_NAME_LEN], str2[MAX_NAME_LEN];
        char fromname[MAX_NAME_LEN], toname[MAX_NAME_LEN];

        ret = pool_proto->xattr_get(pool_proto, LICH_SYSTEM_ATTR_RENAME, buf, &buflen);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        memset(&from, 0x0, sizeof(from));
        memset(&to, 0x0, sizeof(to));

        ret = sscanf(buf, "from:(%[^)])/(%[^)]), to:(%[^)])/(%[^)])",
                     str1, fromname, str2, toname);
        YASSERT(ret == 4);

        base64_decode(fromname, &len, _fromname);
        base64_decode(toname, &len, _toname);


        if (_from)
                str2chkid(_from, str1);
        if (_to)
                str2chkid(_to, str2);

        return 0;
err_ret:
        return ret;
}

static int  __pool_rename_get_lock(pool_proto_t *pool_proto, chkid_t *_from,
                                   char *_fromname)
{
        int ret, buflen = MAX_NAME_LEN, len;
        chkid_t from;
        char buf[MAX_BUF_LEN], str1[MAX_NAME_LEN];
        char fromname[MAX_NAME_LEN];

        ret = pool_proto->xattr_get(pool_proto, LICH_SYSTEM_ATTR_RENAME_LOCK, buf, &buflen);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        memset(&from, 0x0, sizeof(from));

        ret = sscanf(buf, "from:(%[^)])/(%[^)])",
                     str1, fromname);

        YASSERT(ret == 2);

        base64_decode(fromname, &len, _fromname);

        if (_from)
                str2chkid(_from, str1);

        return 0;
err_ret:
        return ret;
}

static int  __pool_rename_set_lock(pool_proto_t *pool_proto, const chkid_t *src, const  char *_name)
{
        int ret;
        char buf[MAX_BUF_LEN], name[MAX_BUF_LEN];

        base64_encode(_name, strlen(_name) + 1, name);

        snprintf(buf, MAX_PATH_LEN, "from:("CHKID_FORMAT")/(%s)",
                 CHKID_ARG(src), name);

        ret = pool_proto->xattr_set(pool_proto, 
                                   LICH_SYSTEM_ATTR_RENAME_LOCK, buf, strlen(buf) + 1, O_EXCL);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DINFO("pool_rename @ "CHKID_FORMAT" need retry again\n",
                              CHKID_ARG(&pool_proto->chkid));
                        ret = EAGAIN;
                }

                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_remove_log(pool_proto_t *pool_proto)
{
        return pool_proto->xattr_remove(pool_proto, LICH_SYSTEM_ATTR_RENAME);
}

static int __pool_rename_remove_lock(pool_proto_t *pool_proto)
{
        return pool_proto->xattr_remove(pool_proto, LICH_SYSTEM_ATTR_RENAME_LOCK);
}

static int __pool_rename_local__(pool_proto_t *pool_proto,
                               const char *fromname, const char *toname, int redo)
{
        int ret;
        entry_t *ent, *ent1;

        ent = hash_table_find(pool_proto->name_tab, (void *)fromname);
        if (ent == NULL) {
                ent = hash_table_find(pool_proto->name_tab, (void *)toname);
                if (ent) {
                        if (redo) {
                                DINFO("redo nothing");
                                goto out;
                        } else {
                                YASSERT(0);
                        }
                } else {
                        YASSERT(0);
                }
        }

        ret = pool_proto_update(pool_proto, toname, ent->chkinfo, &ent->loc);
        if (unlikely(ret)) {
                pool_proto->ltime = 0;
                GOTO(err_ret, ret);
        }

        ret = hash_table_remove(pool_proto->name_tab, fromname, (void **)&ent1);
        if (unlikely(ret))
                YASSERT(0);

        yfree((void **)&ent->name);
        ret = ymalloc((void **)&ent->name, strlen(toname) + 1);
        if (unlikely(ret))
                YASSERT(0);

        memcpy(ent->name, toname, strlen(toname) + 1);

        ret = hash_table_insert(pool_proto->name_tab, (void *)ent, ent->name, 0);
        if (unlikely(ret))
                YASSERT(0);

        YASSERT(ent = ent1);

out:
        return 0;
err_ret:
        return ret;
}

static int __pool_rename_local_check(pool_proto_t *pool_proto, const char *fromname,
                                        const char *toname)
{
        int ret, type;
        entry_t *ent;

        ent = hash_table_find(pool_proto->name_tab, (void *)fromname);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        type = ent->chkinfo->id.type;
        ent = hash_table_find(pool_proto->name_tab, (void *)toname);
        if (ent) {
                DWARN(""CHKID_FORMAT"/%s exist\n", CHKID_ARG(&pool_proto->chkid), toname);

                if (type != (int)ent->chkinfo->id.type) {
                        ret = EPERM;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_local(pool_proto_t *pool_proto, const char *fromname,
                                    const char *toname)
{
        int ret;

        ret = __pool_rename_local_check(pool_proto, fromname, toname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_rename_set_log(pool_proto, &pool_proto->chkid, fromname,
                               &pool_proto->chkid, toname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_rename_local__(pool_proto, fromname, toname, 0);
        if (unlikely(ret))
                GOTO(err_reload, ret);

        ret = __pool_rename_remove_log(pool_proto);
        if (unlikely(ret))
                GOTO(err_reload, ret);

        return 0;
err_reload:
        pool_proto->ltime = 0;
err_ret:
        return ret;
}

static int __pool_rename_remote__(pool_proto_t *pool_proto, const char *fromname,
                             const fileid_t *to, const char *toname, int force)
{
        int ret, retry = 0;
        entry_t *ent;
        const chkinfo_t *chkinfo;
        char buf[MAX_BUF_LEN];

retry:
        ret = md_rename_lock(to, &pool_proto->chkid, toname, force);
        if (unlikely(ret)) {
                if (ret == EBUSY) {
                        USLEEP_RETRY1(err_ret, ret, retry, retry, 3, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)fromname);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        chkinfo = ent->chkinfo;
        strcpy(buf, id2str(&pool_proto->chkid));
        ret = chunk_proto_setparent(chkinfo, to);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                GOTO(err_ret, ret);
        }

        ret = md_rename_unlock(to, ent->chkinfo);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                GOTO(err_ret, ret);
        }

        ret = pool_proto_del(pool_proto, fromname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_remote(pool_proto_t *pool_proto, const char *fromname,
                           const fileid_t *to, const char *toname)
{
        int ret;

        ret = __pool_rename_set_log(pool_proto, &pool_proto->chkid, fromname, to, toname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_rename_remote__(pool_proto, fromname, to, toname, 0);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = __pool_rename_remove_log(pool_proto);
                        if (unlikely(ret))
                                GOTO(err_reload, ret);
                } else
                        GOTO(err_reload, ret);
        }

        ret = __pool_rename_remove_log(pool_proto);
        if (unlikely(ret))
                GOTO(err_reload, ret);

        return 0;
err_reload:
        pool_proto->ltime = 0;
err_ret:
        return ret;
}

static void __pool_rename_lock_stm(job_t *job, int *keep)
{
        int ret;
        fileid_t *fileid;
        fileinfo_t fileinfo;

        *keep = 0;
        fileid = job->context;

        DINFO("notify "CHKID_FORMAT"\n", CHKID_ARG(fileid));

        ret = md_getattr(fileid, &fileinfo);
        if (unlikely(ret)) {
                DWARN("check "CHKID_FORMAT" fail, ret %d\n", CHKID_ARG(fileid), ret);
        }

        UNIMPLEMENTED(__WARN__);//need check peer log
}

void pool_rename_lock_notify(const fileid_t *_fileid)
{
        int ret;
        job_t *job;
        fileid_t *fileid;

        DINFO("notify "CHKID_FORMAT"\n", CHKID_ARG(_fileid));

        ret = job_create(&job, &jobtracker, "pool_rename_notify");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = job_context_create(job, sizeof(*fileid));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        fileid = job->context;
        *fileid = *_fileid;
        job->status = STATUS_PREP;
        job->state_machine = __pool_rename_lock_stm;

        job_exec(job_handler(job, 0), 0, EXEC_INDIRECT);
}

int pool_rename_load_lock(pool_proto_t *pool_proto)
{
        int ret;
        char name[MAX_NAME_LEN];
        fileid_t src;

        ret = __pool_rename_get_lock(pool_proto, &src, name);
        if (unlikely(ret)) {
                if (ret == ENOKEY)
                        goto out;
                else
                        GOTO(err_ret, ret);
        }

        DINFO("notify "CHKID_FORMAT"\n", CHKID_ARG(&src));

        pool_rename_lock_notify(&src);

        pool_proto->name_locked = 1;

out:
        return 0;
err_ret:
        return ret;
}

static int __pool_rename_lock(struct __pool_proto *pool_proto,
                const fileid_t *src, const char *name)
{
        int ret;
        entry_t *ent;

        if (pool_proto->name_locked) {
                pool_rename_lock_notify(src);
                ret = EBUSY;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        ret = __pool_rename_set_lock(pool_proto, src, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        pool_proto->name_locked = 1;

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_relock(struct __pool_proto *pool_proto,
                           const fileid_t *_src, const char *_name)
{
        int ret;
        entry_t *ent;
        fileid_t src;
        char name[MAX_NAME_LEN];

        if (!pool_proto->name_locked) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = __pool_rename_get_lock(pool_proto, &src, name);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        ret = EINVAL;
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        if (chkid_cmp(&src, _src) || strcmp(name, _name)) {
                ret = EINVAL;
                DERROR("chkid "CHKID_FORMAT" : "CHKID_FORMAT", name %s %s\n",
                       CHKID_ARG(&src), CHKID_ARG(_src), name, _name);
                SWARN(0, "%s, rename fail, chkid "CHKID_FORMAT" : "CHKID_FORMAT", name %s %s\n",
                       M_DATA_CHUNK_WARN, CHKID_ARG(&src), CHKID_ARG(_src), name, _name);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int pool_rename_lock(struct __pool_proto *pool_proto,
                const fileid_t *src, const char *name, int force)
{
        if (force == 0) {
                return __pool_rename_lock(pool_proto, src, name);
        } else {
                return __pool_rename_relock(pool_proto, src, name);
        }
}

int pool_rename_unlock(struct __pool_proto *pool_proto, const chkinfo_t *chkinfo)
{
        int ret;
        chkid_t src;
        char name[MAX_NAME_LEN];

        ret = __pool_rename_get_lock(pool_proto, &src, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = pool_proto_insert(pool_proto, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_rename_remove_lock(pool_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pool_proto->name_locked = 0;

        return 0;
err_ret:
        return ret;
}

static int __pool_rename_check1(pool_proto_t *pool_proto, const fileid_t *from,
                const char *fromname, const fileid_t *to, const char *toname)
{
        int ret;
        entry_t *ent;

        (void) from;
        (void) to;
        (void) toname;

        if (chkid_isroot(&pool_proto->parentid)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)fromname);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int pool_rename1(pool_proto_t *pool_proto, const fileid_t *from,
                const char *fromname, const fileid_t *to, const char *toname)
{
        int ret;

        if (pool_proto->name_locked) {
                pool_rename_lock_notify(NULL);
                DWARN(""CHKID_FORMAT" locked\n", CHKID_ARG(&pool_proto->chkid));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = __pool_rename_check1(pool_proto, from, fromname, to, toname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid_cmp(from, to) == 0) {
                ret = __pool_rename_local(pool_proto, fromname, toname);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __pool_rename_remote(pool_proto, fromname, to, toname);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int pool_rename_load(pool_proto_t *pool_proto)
{
        int ret;
        char from[MAX_NAME_LEN], to[MAX_NAME_LEN];
        fileid_t fromid, toid;

        ret = __pool_rename_get_log(pool_proto, &fromid, from, &toid, to);
        if (unlikely(ret)) {
                if (ret == ENOKEY)
                        goto out;
                else
                        GOTO(err_ret, ret);
        }

        if (chkid_cmp(&fromid, &toid) == 0) {
                ret = __pool_rename_local__(pool_proto, from, to, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                DINFO("pool_rename from "CHKID_FORMAT"/%s to "CHKID_FORMAT"/%s\n",
                      CHKID_ARG(&pool_proto->chkid), from, CHKID_ARG(&toid), to);

                ret = EPERM;
                GOTO(err_ret, ret);

                ret = __pool_rename_remote__(pool_proto, from, &toid, to, 0);
                if (unlikely(ret)) {
                retry:
                        if (ret == EBUSY) {
                                ret = __pool_rename_remote__(pool_proto, from, &toid, to, 1);
                                YASSERT(ret != EBUSY);
                                goto retry;
                        } else if (ret == ENOENT) {
                                DERROR("pool_rename from "CHKID_FORMAT"/%s to "CHKID_FORMAT"/%s\n",
                                       CHKID_ARG(&pool_proto->chkid), from, CHKID_ARG(&toid), to);
                                SWARN(0, "%s, pool_rename from "CHKID_FORMAT"/%s to "CHKID_FORMAT"/%s\n",
                                       M_DATA_CHUNK_WARN, CHKID_ARG(&pool_proto->chkid), from, CHKID_ARG(&toid), to);

                                UNIMPLEMENTED(__DUMP__);
                                ret = __pool_rename_remove_log(pool_proto);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        }

        ret = __pool_rename_remove_log(pool_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        return 0;
err_ret:
        return ret;
}
